582bdba4b201e6ab8e2a9a05cff3566f1bab9dce,src/java/org/apache/cassandra/streaming/StreamReader.java,StreamReader,read,#ReadableByteChannel#,85

Before Change


        long totalSize = totalSize();

        Pair<String, String> kscf = Schema.instance.getCF(cfId);
        if (kscf == null)
        {
            // schema was dropped during streaming
            throw new IOException("CF " + cfId + " was dropped during streaming");
        }
        ColumnFamilyStore cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);

        DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
        BytesReadTracker in = new BytesReadTracker(dis);

After Change


        long totalSize = totalSize();

        Pair<String, String> kscf = Schema.instance.getCF(cfId);
        ColumnFamilyStore cfs = null;
        if (kscf != null)
            cfs = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);

        if (kscf == null || cfs == null)
        {
            // schema was dropped during streaming
            throw new IOException("CF " + cfId + " was dropped during streaming");
        }

        logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', table = '{}'.",
                     session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(),
                     cfs.getColumnFamilyName());

        DataInputStream dis = new DataInputStream(new LZFInputStream(Channels.newInputStream(channel)));
        BytesReadTracker in = new BytesReadTracker(dis);
        SSTableWriter writer = null;
        DecoratedKey key = null;
        try
        {
            writer = createWriter(cfs, totalSize, repairedAt, format);
            while (in.getBytesRead() < totalSize)
            {
                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
                writeRow(key, writer, in, cfs);

                // TODO move this to BytesReadTracker
                session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
            }
            logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}",
                         session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize);
            return writer;
        } catch (Throwable e)
        {
            if (key != null)
                logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.",
                            session.planId(), key, cfs.keyspace.getName(), cfs.getColumnFamilyName());
            if (writer != null)
            {
                try